{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Amazon Glue DataBrew"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Prerequisites"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Add The Following Trust Relationship To Your IAM Role\n",
    "\n",
    "```\n",
    "    {\n",
    "      \"Sid\": \"\",\n",
    "      \"Effect\": \"Allow\",\n",
    "      \"Principal\": {\n",
    "        \"Service\": \"databrew.amazonaws.com\"\n",
    "      },\n",
    "      \"Action\": \"sts:AssumeRole\"\n",
    "    }\n",
    "```\n",
    "\n",
    "and Access Policy For Glue DataBrew."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Imports and Settings"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import sagemaker\n",
    "import boto3\n",
    "\n",
    "sess   = sagemaker.Session()\n",
    "bucket = sess.default_bucket()\n",
    "role = sagemaker.get_execution_role()\n",
    "region = boto3.Session().region_name\n",
    "\n",
    "sts = boto3.Session().client(service_name='sts')\n",
    "db = boto3.Session().client('databrew')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "account_id = sts.get_caller_identity().get('Account') "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "import json\n",
    "import pandas as pd\n",
    "from botocore.exceptions import ClientError"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Create Dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "timestamp = int(time.time())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "dataset_name = 'reviews-dataset-{}'.format(timestamp)\n",
    "input_bucket='amazon-reviews-pds'\n",
    "key='parquet/'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "ename": "AccessDeniedException",
     "evalue": "An error occurred (AccessDeniedException) when calling the CreateDataset operation: User: arn:aws:sts::405759480474:assumed-role/mod-caf61d640fbd4ba7-SageMakerExecutionRole-1U3FI8J98QOSN/SageMaker is not authorized to perform: databrew:CreateDataset on resource: arn:aws:databrew:us-east-1:405759480474:* with an explicit deny",
     "output_type": "error",
     "traceback": [
      "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
      "\u001b[0;31mAccessDeniedException\u001b[0m                     Traceback (most recent call last)",
      "\u001b[0;32m<ipython-input-6-826b459ec2a7>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[1;32m      4\u001b[0m         'S3InputDefinition': {\n\u001b[1;32m      5\u001b[0m             \u001b[0;34m'Bucket'\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0minput_bucket\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 6\u001b[0;31m             \u001b[0;34m'Key'\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0mkey\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m      7\u001b[0m         },\n\u001b[1;32m      8\u001b[0m     },\n",
      "\u001b[0;32m/opt/conda/lib/python3.7/site-packages/botocore/client.py\u001b[0m in \u001b[0;36m_api_call\u001b[0;34m(self, *args, **kwargs)\u001b[0m\n\u001b[1;32m    355\u001b[0m                     \"%s() only accepts keyword arguments.\" % py_operation_name)\n\u001b[1;32m    356\u001b[0m             \u001b[0;31m# The \"self\" in this scope is referring to the BaseClient.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 357\u001b[0;31m             \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_make_api_call\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0moperation_name\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    358\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    359\u001b[0m         \u001b[0m_api_call\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m__name__\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mstr\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpy_operation_name\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/opt/conda/lib/python3.7/site-packages/botocore/client.py\u001b[0m in \u001b[0;36m_make_api_call\u001b[0;34m(self, operation_name, api_params)\u001b[0m\n\u001b[1;32m    674\u001b[0m             \u001b[0merror_code\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mparsed_response\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mget\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"Error\"\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m{\u001b[0m\u001b[0;34m}\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mget\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"Code\"\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    675\u001b[0m             \u001b[0merror_class\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mexceptions\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mfrom_code\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0merror_code\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 676\u001b[0;31m             \u001b[0;32mraise\u001b[0m \u001b[0merror_class\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mparsed_response\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0moperation_name\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    677\u001b[0m         \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    678\u001b[0m             \u001b[0;32mreturn\u001b[0m \u001b[0mparsed_response\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;31mAccessDeniedException\u001b[0m: An error occurred (AccessDeniedException) when calling the CreateDataset operation: User: arn:aws:sts::405759480474:assumed-role/mod-caf61d640fbd4ba7-SageMakerExecutionRole-1U3FI8J98QOSN/SageMaker is not authorized to perform: databrew:CreateDataset on resource: arn:aws:databrew:us-east-1:405759480474:* with an explicit deny"
     ]
    }
   ],
   "source": [
    "response = db.create_dataset(\n",
    "    Name=dataset_name,\n",
    "    Input={\n",
    "        'S3InputDefinition': {\n",
    "            'Bucket': input_bucket,\n",
    "            'Key': key\n",
    "        },\n",
    "    },\n",
    ")\n",
    "\n",
    "print(json.dumps(response, indent=4, sort_keys=True, default=str))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.core.display import display, HTML\n",
    "\n",
    "display(HTML('<b>Review <a target=\"blank\" href=\"https://console.aws.amazon.com/databrew/home?region={}#dataset-details?dataset={}&tab=preview\">Dataset</a></b>'.format(region, dataset_name)))\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "response = db.describe_dataset(\n",
    "    Name=dataset_name\n",
    ")\n",
    "\n",
    "print(json.dumps(response, indent=4, sort_keys=True, default=str))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Get Dataset Resource ARN"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "dataset_arn = response['ResourceArn']\n",
    "print(dataset_name)\n",
    "print(dataset_arn)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Create Recipe"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "recipe_name='reviews-dataset-recipe-{}'.format(timestamp)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# View Recipe File"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "!pygmentize ./amazon-reviews-dataset-recipe.json"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Load Recipe File"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "# Read file\n",
    "with open('./amazon-reviews-dataset-recipe.json', 'r') as file:\n",
    "    file_object=file.read()\n",
    "\n",
    "# Parse file\n",
    "recipe_steps = json.loads(file_object)\n",
    "\n",
    "print(json.dumps(recipe_steps, indent=4, sort_keys=True, default=str))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Create Recipe From File"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "response = db.create_recipe(\n",
    "    Description='Amazon Customer Reviews Recipe',\n",
    "    Name=recipe_name,\n",
    "    Steps=recipe_steps\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(json.dumps(response, indent=4, sort_keys=True, default=str))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create Project"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "project_name = 'reviews-dataset-project-{}'.format(timestamp)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "response = db.create_project(\n",
    "    DatasetName=dataset_name,\n",
    "    Name=project_name,\n",
    "    RecipeName=recipe_name,\n",
    "    Sample={\n",
    "        'Size': 500,\n",
    "        'Type': 'FIRST_N'\n",
    "    },\n",
    "    RoleArn=role\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(json.dumps(response, indent=4, sort_keys=True, default=str))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.core.display import display, HTML\n",
    "\n",
    "display(HTML('<b>Review <a target=\"blank\" href=\"https://console.aws.amazon.com/databrew/home?region={}#project-workspace?project={}&view=grid\">Project</a></b>'.format(region, project_name)))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Create Recipe Job"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "job_name = 'reviews-dataset-recipe-job-{}'.format(timestamp)\n",
    "output_bucket = bucket\n",
    "output_key = 'databrew/'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# TODO: Add Data Brew Trust Relation to IAM Role"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "response = db.create_recipe_job(\n",
    "#    DatasetName=dataset_name,\n",
    "    Name=job_name,\n",
    "    LogSubscription='ENABLE',\n",
    "    MaxCapacity=10,\n",
    "    MaxRetries=0,\n",
    "    Outputs=[\n",
    "        {\n",
    "            'Format': 'CSV',\n",
    "            'PartitionColumns': [],\n",
    "            'Location': {\n",
    "                'Bucket': output_bucket,\n",
    "                'Key': output_key\n",
    "            },\n",
    "            'Overwrite': True\n",
    "        },\n",
    "    ],\n",
    "    ProjectName=project_name,\n",
    "#     RecipeReference={\n",
    "#         'Name': recipe_name\n",
    "#     },\n",
    "    RoleArn=role,\n",
    "    Timeout=2880\n",
    ")\n",
    "\n",
    "print(json.dumps(response, indent=4, sort_keys=True, default=str))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.core.display import display, HTML\n",
    "\n",
    "display(HTML('<b>Review <a target=\"blank\" href=\"https://console.aws.amazon.com/databrew/home?region={}#job-details?job={}&tab=history\">Recipe Job</a></b>'.format(region, job_name)))\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Start Job Run"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "response = db.start_job_run(\n",
    "    Name=job_name\n",
    ")\n",
    "\n",
    "print(json.dumps(response, indent=4, sort_keys=True, default=str))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Get Job Run ID"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "job_run_id = response['RunId']\n",
    "print(job_run_id)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# List Job Run"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "response = db.list_job_runs(\n",
    "    Name=job_name\n",
    ")\n",
    "\n",
    "print(json.dumps(response, indent=4, sort_keys=True, default=str))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "status = response['JobRuns'][0]['State']\n",
    "print(status)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# _Wait For The Job Run To Complete. The Job Runs For About 30min._"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "import time\n",
    "\n",
    "response = db.list_job_runs(Name=job_name)\n",
    "\n",
    "while response['JobRuns'][0]['State'] == 'RUNNING':\n",
    "    response = db.list_job_runs(Name=job_name)\n",
    "    status = response['JobRuns'][0]['State']\n",
    "    print('Job Run State: {}'.format(status))\n",
    "    time.sleep(15)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Review S3 Bucket With CSV File"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.core.display import display, HTML\n",
    "\n",
    "display(HTML('<b>Review <a target=\"blank\" href=\"https://console.aws.amazon.com/s3/buckets/{}?region={}&prefix={}\">S3 Bucket</a></b>'.format(output_bucket, region, output_key)))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Show the CSV Files"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "part_file='{}_part00000.csv'.format(job_name)\n",
    "print(part_file)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "s3_output_bucket='{}/{}'.format(output_bucket, output_key)\n",
    "print(s3_output_bucket)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!aws s3 cp s3://$s3_output_bucket$part_file ./"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import csv\n",
    "\n",
    "df_reviews = pd.read_csv('./amazon-customer-reviews-dataset-recipe-job_part00000.csv')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_reviews.head()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%javascript\n",
    "\n",
    "try {\n",
    "    Jupyter.notebook.save_checkpoint();\n",
    "    Jupyter.notebook.session.delete();\n",
    "}\n",
    "catch(err) {\n",
    "    // NoOp\n",
    "}"
   ]
  }
 ],
 "metadata": {
  "instance_type": "ml.t3.medium",
  "kernelspec": {
   "display_name": "Python 3 (Data Science)",
   "language": "python",
   "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
